package com.ssm.config;

import com.ssm.listener.KafkaConsumeListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

/**
 * 功能描述:
 *
 * @author lixiaomeng
 * @date 2018/12/11
 */
@Configuration
@EnableKafka
@PropertySource(value = "classpath:kafka.properties")
public class KafkaConfig {

    @Value("${bootstrap.servers}")
    private String bootstrapServers;

    @Value("${acks}")
    private String acks;

    @Value("${retries}")
    private String retries;

    @Value("${batch.size}")
    private String batchSize;

    @Value("${linger.ms}")
    private String lingerMs;

    @Value("${max.in.flight.requests.per.connection}")
    private String maxInFlightRequestPerConnection;

    @Value("${key.serializer}")
    private String keySerializer;

    @Value("${value.serializer}")
    private String valueSerializer;




    /**
     *
     * 功能描述: Kafka的消息生产配置信息
     *
     * @param: []
     * @return: java.util.Map<java.lang.String,java.lang.Object>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:29
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG,acks);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,maxInFlightRequestPerConnection);
        return props;
    }

    /**
     *
     * 功能描述: 生产者的生产工厂
     *
     * @param: []
     * @return: org.springframework.kafka.core.ProducerFactory<java.lang.Integer,java.lang.String>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:29
     */
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     *
     * 功能描述: 通过生产工厂，生成Spring中的KafkaTemplate，spring通过该类，对Kafka的KafkaProducer进行了封装
     * 集成方式，生产者发送消息，直接调用template对应的send方法即可
     *
     * @param: []
     * @return: org.springframework.kafka.core.KafkaTemplate<java.lang.Integer,java.lang.String>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:30
     */
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }


    // kafka客户端消费者
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"springKafka");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 设置自动提交偏移量 offset
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,500);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); // 批量获取数据情况下，每次最多获取的消息条数
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
        return props;
    }

    /**
     *
     * 功能描述: 消费者的工厂类
     *
     * @param: []
     * @return: org.springframework.kafka.core.ConsumerFactory<java.lang.Integer,java.lang.String>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:48
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs());
        return consumerFactory;
    }

    /**
     *
     * 功能描述: 单条消息进行获取
     *
     * @param: []
     * @return: ConcurrentKafkaListenerContainerFactory<Integer,String>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:42
     */
    @Bean(name = "kafkaOneListener")
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 设置监听客户端的并发监听数量，其实也就是客户端监听的线程数
        factory.setConcurrency(3);

        // 设置批量监听,一次获取多条数据进行处理,每次获取的消息数量  <= max.poll.records
        // 如果不设置批量监听，spring的消息监听，将一条一条的获取消息数据
        factory.setBatchListener(false);

        return factory;
    }


    /**
     *
     * 功能描述: 批量消息获取
     *
     * @param: []
     * @return: org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.Integer,java.lang.String>
     * @author: lixiaomeng
     * @time: 2018/12/11 17:42
     */
    @Bean(name = "kafkaBatchListener")
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaBatchListener() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 设置监听客户端的并发监听数量，其实也就是客户端监听的线程数
         factory.setConcurrency(3);

        // 设置批量监听,一次获取多条数据进行处理,每次获取的消息数量  <= max.poll.records
        // 如果不设置批量监听，spring的消息监听，将一条一条的获取消息数据
        factory.setBatchListener(true);

        return factory;
    }


    /**
     *
     * 功能描述: 注册Kafka的监听器到Spring的容器中
     *
     * @param: []
     * @return: com.ssm.listener.KafkaConsumeListener
     * @author: lixiaomeng
     * @time: 2018/12/11 17:38
     */
    @Bean
    public KafkaConsumeListener listener() {
        KafkaConsumeListener listener = new KafkaConsumeListener();
        return listener;
    }


}
